Add a GNU make jobserver implementation to Cargo
authorAlex Crichton <alex@alexcrichton.com>
Tue, 30 May 2017 04:09:53 +0000 (21:09 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Fri, 2 Jun 2017 15:06:30 +0000 (08:06 -0700)
This commit adds a GNU make jobserver implementation to Cargo, both as a client
of existing jobservers and also a creator of new jobservers. The jobserver is
actually just an IPC semaphore which manifests itself as a pipe with N bytes
of tokens on Unix and a literal IPC semaphore on Windows. The rough protocol
is then if you want to run a job you read acquire the semaphore (read a byte on
Unix or wait on the semaphore on Windows) and then you release it when you're
done.

All the hairy details of the jobserver implementation are housed in the
`jobserver` crate on crates.io instead of Cargo. This should hopefully make it
much easier for the compiler to also share a jobserver implementation
eventually.

The main tricky bit here is that on Unix and Windows acquiring a jobserver token
will block the calling thread. We need to either way for a running job to exit
or to acquire a new token when we want to spawn a new job. To handle this the
current implementation spawns a helper thread that does the blocking and sends a
message back to Cargo when it receives a token. It's a little trickier with
shutting down this thread gracefully as well but more details can be found in
the `jobserver` crate.

Unfortunately crates are unlikely to see an immediate benefit of this once
implemented. Most crates are run with a manual `make -jN` and this overrides the
jobserver in the environment, creating a new jobserver in the sub-make. If the
`-jN` argument is removed, however, then `make` will share Cargo's jobserver and
properly limit parallelism.

Closes #1744

14 files changed:
Cargo.lock
Cargo.toml
src/cargo/lib.rs
src/cargo/ops/cargo_clean.rs
src/cargo/ops/cargo_compile.rs
src/cargo/ops/cargo_rustc/context.rs
src/cargo/ops/cargo_rustc/custom_build.rs
src/cargo/ops/cargo_rustc/job_queue.rs
src/cargo/ops/cargo_rustc/mod.rs
src/cargo/util/config.rs
src/cargo/util/process_builder.rs
src/doc/environment-variables.md
tests/cargotest/lib.rs
tests/jobserver.rs [new file with mode: 0644]

index c0d530e2f14641718b34b893ec6fdd3e0ff8ec36..ebb7e9b836722b576df112a8fbc212d2525000e5 100644 (file)
@@ -18,6 +18,7 @@ dependencies = [
  "git2-curl 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
  "hamcrest 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "jobserver 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
  "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
  "libc 0.2.23 (registry+https://github.com/rust-lang/crates.io-index)",
  "libgit2-sys 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -323,6 +324,17 @@ name = "itoa"
 version = "0.3.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
+[[package]]
+name = "jobserver"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.23 (registry+https://github.com/rust-lang/crates.io-index)",
+ "rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)",
+ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
 [[package]]
 name = "kernel32-sys"
 version = "0.2.2"
@@ -874,6 +886,7 @@ dependencies = [
 "checksum hamcrest 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bf088f042a467089e9baa4972f57f9247e42a0cc549ba264c7a04fbb8ecb89d4"
 "checksum idna 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2233d4940b1f19f0418c158509cd7396b8d70a5db5705ce410914dc8fa603b37"
 "checksum itoa 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "eb2f404fbc66fd9aac13e998248505e7ecb2ad8e44ab6388684c5fb11c6c251c"
+"checksum jobserver 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3d97367c7112f0a46029cdf826c442bb71061789b0dcd1f0e71ce56ce3ee7a6a"
 "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
 "checksum lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3b37545ab726dd833ec6420aaba8231c5b320814b9029ad585555d2a03e94fbf"
 "checksum libc 0.2.23 (registry+https://github.com/rust-lang/crates.io-index)" = "e7eb6b826bfc1fdea7935d46556250d1799b7fe2d9f7951071f4291710665e3e"
index c8df5d84f5cf29542a9158f8949298a258fee556..7a4065ccacd7a42e2e25d8fb25b827b16d90a0bc 100644 (file)
@@ -29,6 +29,7 @@ fs2 = "0.4"
 git2 = "0.6"
 git2-curl = "0.7"
 glob = "0.2"
+jobserver = "0.1.2"
 libc = "0.2"
 libgit2-sys = "0.6"
 log = "0.3"
@@ -37,8 +38,8 @@ rustc-serialize = "0.3"
 semver = { version = "0.7.0", features = ["serde"] }
 serde = "1.0"
 serde_derive = "1.0"
-serde_json = "1.0"
 serde_ignored = "0.0.3"
+serde_json = "1.0"
 shell-escape = "0.1"
 tar = { version = "0.4", default-features = false }
 tempdir = "0.3"
index a03662927b9fcd94f6dfd129995fd68fabec3e39..5bce0e59e4571626ff33b9d0cfb470d393df705a 100755 (executable)
@@ -16,6 +16,7 @@ extern crate flate2;
 extern crate fs2;
 extern crate git2;
 extern crate glob;
+extern crate jobserver;
 extern crate libc;
 extern crate libgit2_sys;
 extern crate num_cpus;
index c7f1b28f8b6f512ea39ab986737e2550eab7440c..8a15bf19690e44fd04481fe323355507f72d4f81 100644 (file)
@@ -37,6 +37,7 @@ pub fn clean(ws: &Workspace, opts: &CleanOptions) -> CargoResult<()> {
                                        host_triple: host_triple,
                                        requested_target: opts.target.map(|s| s.to_owned()),
                                        release: opts.release,
+                                       jobs: 1,
                                        ..BuildConfig::default()
                                    },
                                    profiles)?;
index 81e757c02c7e7da05b9119b49cc7b97a74bc01d8..888939a22034681caad923abd02718cdaf573c71 100644 (file)
@@ -639,6 +639,11 @@ fn scrape_build_config(config: &Config,
                        jobs: Option<u32>,
                        target: Option<String>)
                        -> CargoResult<ops::BuildConfig> {
+    if jobs.is_some() && config.jobserver_from_env().is_some() {
+        config.shell().warn("a `-j` argument was passed to Cargo but Cargo is \
+                             also configured with an external jobserver in \
+                             its environment, ignoring the `-j` parameter")?;
+    }
     let cfg_jobs = match config.get_i64("build.jobs")? {
         Some(v) => {
             if v.val <= 0 {
index f78e67049a721b59372fafcf403e4366fcaaa8ce..9842306bd3774d0b933cb544a68e8a6a5be6180b 100644 (file)
@@ -8,6 +8,8 @@ use std::path::{Path, PathBuf};
 use std::str::{self, FromStr};
 use std::sync::Arc;
 
+use jobserver::Client;
+
 use core::{Package, PackageId, PackageSet, Resolve, Target, Profile};
 use core::{TargetKind, Profiles, Dependency, Workspace};
 use core::dependency::Kind as DepKind;
@@ -43,6 +45,7 @@ pub struct Context<'a, 'cfg: 'a> {
     pub build_scripts: HashMap<Unit<'a>, Arc<BuildScripts>>,
     pub links: Links<'a>,
     pub used_in_plugin: HashSet<Unit<'a>>,
+    pub jobserver: Client,
 
     host: Layout,
     target: Option<Layout>,
@@ -94,6 +97,21 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
             config.rustc()?.verbose_version.contains("-dev");
         let incremental_enabled = incremental_enabled && is_nightly;
 
+        // Load up the jobserver that we'll use to manage our parallelism. This
+        // is the same as the GNU make implementation of a jobserver, and
+        // intentionally so! It's hoped that we can interact with GNU make and
+        // all share the same jobserver.
+        //
+        // Note that if we don't have a jobserver in our environment then we
+        // create our own, and we create it with `n-1` tokens because one token
+        // is ourself, a running process.
+        let jobserver = match config.jobserver_from_env() {
+            Some(c) => c.clone(),
+            None => Client::new(build_config.jobs as usize - 1).chain_err(|| {
+                "failed to create jobserver"
+            })?,
+        };
+
         Ok(Context {
             ws: ws,
             host: host_layout,
@@ -114,6 +132,7 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
             links: Links::new(),
             used_in_plugin: HashSet::new(),
             incremental_enabled: incremental_enabled,
+            jobserver: jobserver,
         })
     }
 
index 3e988865a2a7cb8833213e8cf269a06e16fc57fb..6bb2620fdaed95743970bd702405d155d24b4ccb 100644 (file)
@@ -115,7 +115,8 @@ fn build_work<'a, 'cfg>(cx: &mut Context<'a, 'cfg>, unit: &Unit<'a>)
        .env("PROFILE", if cx.build_config.release { "release" } else { "debug" })
        .env("HOST", cx.host_triple())
        .env("RUSTC", &cx.config.rustc()?.path)
-       .env("RUSTDOC", &*cx.config.rustdoc()?);
+       .env("RUSTDOC", &*cx.config.rustdoc()?)
+       .inherit_jobserver(&cx.jobserver);
 
     if let Some(links) = unit.pkg.manifest().links() {
         cmd.env("CARGO_MANIFEST_LINKS", links);
index bfa374780d3ac41cc8fad72a31015eab0b7b394f..7bb3e35e12673b87857a896a95ea75d294415317 100644 (file)
@@ -1,15 +1,17 @@
 use std::collections::HashSet;
 use std::collections::hash_map::HashMap;
 use std::fmt;
-use std::io::Write;
+use std::io::{self, Write};
+use std::mem;
 use std::sync::mpsc::{channel, Sender, Receiver};
 
 use crossbeam::{self, Scope};
+use jobserver::{Acquired, HelperThread};
 use term::color::YELLOW;
 
 use core::{PackageId, Target, Profile};
 use util::{Config, DependencyQueue, Fresh, Dirty, Freshness};
-use util::{CargoResult, ProcessBuilder, profile, internal};
+use util::{CargoResult, ProcessBuilder, profile, internal, CargoResultExt};
 use {handle_error};
 
 use super::{Context, Kind, Unit};
@@ -21,10 +23,9 @@ use super::job::Job;
 /// actual compilation step of each package. Packages enqueue units of work and
 /// then later on the entire graph is processed and compiled.
 pub struct JobQueue<'a> {
-    jobs: usize,
     queue: DependencyQueue<Key<'a>, Vec<(Job, Freshness)>>,
-    tx: Sender<(Key<'a>, Message)>,
-    rx: Receiver<(Key<'a>, Message)>,
+    tx: Sender<Message<'a>>,
+    rx: Receiver<Message<'a>>,
     active: usize,
     pending: HashMap<Key<'a>, PendingBuild>,
     compiled: HashSet<&'a PackageId>,
@@ -51,28 +52,28 @@ struct Key<'a> {
 }
 
 pub struct JobState<'a> {
-    tx: Sender<(Key<'a>, Message)>,
-    key: Key<'a>,
+    tx: Sender<Message<'a>>,
 }
 
-enum Message {
+enum Message<'a> {
     Run(String),
     Stdout(String),
     Stderr(String),
-    Finish(CargoResult<()>),
+    Token(io::Result<Acquired>),
+    Finish(Key<'a>, CargoResult<()>),
 }
 
 impl<'a> JobState<'a> {
     pub fn running(&self, cmd: &ProcessBuilder) {
-        let _ = self.tx.send((self.key, Message::Run(cmd.to_string())));
+        let _ = self.tx.send(Message::Run(cmd.to_string()));
     }
 
     pub fn stdout(&self, out: &str) {
-        let _ = self.tx.send((self.key, Message::Stdout(out.to_string())));
+        let _ = self.tx.send(Message::Stdout(out.to_string()));
     }
 
     pub fn stderr(&self, err: &str) {
-        let _ = self.tx.send((self.key, Message::Stderr(err.to_string())));
+        let _ = self.tx.send(Message::Stderr(err.to_string()));
     }
 }
 
@@ -80,7 +81,6 @@ impl<'a> JobQueue<'a> {
     pub fn new<'cfg>(cx: &Context<'a, 'cfg>) -> JobQueue<'a> {
         let (tx, rx) = channel();
         JobQueue {
-            jobs: cx.jobs() as usize,
             queue: DependencyQueue::new(),
             tx: tx,
             rx: rx,
@@ -113,23 +113,51 @@ impl<'a> JobQueue<'a> {
     pub fn execute(&mut self, cx: &mut Context) -> CargoResult<()> {
         let _p = profile::start("executing the job graph");
 
+        // We need to give a handle to the send half of our message queue to the
+        // jobserver helper thrad. Unfortunately though we need the handle to be
+        // `'static` as that's typically what's required when spawning a
+        // thread!
+        //
+        // To work around this we transmute the `Sender` to a static lifetime.
+        // we're only sending "longer living" messages and we should also
+        // destroy all references to the channel before this function exits as
+        // the destructor for the `helper` object will ensure the associated
+        // thread i sno longer running.
+        //
+        // As a result, this `transmute` to a longer lifetime should be safe in
+        // practice.
+        let tx = self.tx.clone();
+        let tx = unsafe {
+            mem::transmute::<Sender<Message<'a>>, Sender<Message<'static>>>(tx)
+        };
+        let helper = cx.jobserver.clone().into_helper_thread(move |token| {
+            drop(tx.send(Message::Token(token)));
+        }).chain_err(|| {
+            "failed to create helper thread for jobserver management"
+        })?;
+
         crossbeam::scope(|scope| {
-            self.drain_the_queue(cx, scope)
+            self.drain_the_queue(cx, scope, &helper)
         })
     }
 
-    fn drain_the_queue(&mut self, cx: &mut Context, scope: &Scope<'a>)
+    fn drain_the_queue(&mut self,
+                       cx: &mut Context,
+                       scope: &Scope<'a>,
+                       jobserver_helper: &HelperThread)
                        -> CargoResult<()> {
         use std::time::Instant;
 
+        let mut tokens = Vec::new();
         let mut queue = Vec::new();
         trace!("queue: {:#?}", self.queue);
 
         // Iteratively execute the entire dependency graph. Each turn of the
         // loop starts out by scheduling as much work as possible (up to the
-        // maximum number of parallel jobs). A local queue is maintained
-        // separately from the main dependency queue as one dequeue may actually
-        // dequeue quite a bit of work (e.g. 10 binaries in one project).
+        // maximum number of parallel jobs we have tokens for). A local queue
+        // is maintained separately from the main dependency queue as one
+        // dequeue may actually dequeue quite a bit of work (e.g. 10 binaries
+        // in one project).
         //
         // After a job has finished we update our internal state if it was
         // successful and otherwise wait for pending work to finish if it failed
@@ -137,32 +165,48 @@ impl<'a> JobQueue<'a> {
         let mut error = None;
         let start_time = Instant::now();
         loop {
-            while error.is_none() && self.active < self.jobs {
-                if !queue.is_empty() {
-                    let (key, job, fresh) = queue.remove(0);
-                    self.run(key, fresh, job, cx.config, scope)?;
-                } else if let Some((fresh, key, jobs)) = self.queue.dequeue() {
-                    let total_fresh = jobs.iter().fold(fresh, |fresh, &(_, f)| {
-                        f.combine(fresh)
-                    });
-                    self.pending.insert(key, PendingBuild {
-                        amt: jobs.len(),
-                        fresh: total_fresh,
-                    });
-                    queue.extend(jobs.into_iter().map(|(job, f)| {
-                        (key, job, f.combine(fresh))
-                    }));
-                } else {
-                    break
+            // Dequeue as much work as we can, learning about everything
+            // possible that can run. Note that this is also the point where we
+            // start requesting job tokens. Each job after the first needs to
+            // request a token.
+            while let Some((fresh, key, jobs)) = self.queue.dequeue() {
+                let total_fresh = jobs.iter().fold(fresh, |fresh, &(_, f)| {
+                    f.combine(fresh)
+                });
+                self.pending.insert(key, PendingBuild {
+                    amt: jobs.len(),
+                    fresh: total_fresh,
+                });
+                for (job, f) in jobs {
+                    queue.push((key, job, f.combine(fresh)));
+                    if self.active + queue.len() > 0 {
+                        jobserver_helper.request_token();
+                    }
                 }
             }
+
+            // Now that we've learned of all possible work that we can execute
+            // try to spawn it so long as we've got a jobserver token which says
+            // we're able to perform some parallel work.
+            while error.is_none() && self.active < tokens.len() + 1 && !queue.is_empty() {
+                let (key, job, fresh) = queue.remove(0);
+                self.run(key, fresh, job, cx.config, scope)?;
+            }
+
+            // If after all that we're not actually running anything then we're
+            // done!
             if self.active == 0 {
                 break
             }
 
-            let (key, msg) = self.rx.recv().unwrap();
+            // And finally, before we block waiting for the next event, drop any
+            // excess tokens we may have accidentally acquired. Due to how our
+            // jobserver interface is architected we may acquire a token that we
+            // don't actually use, and if this happens just relinquish it back
+            // to the jobserver itself.
+            tokens.truncate(self.active - 1);
 
-            match msg {
+            match self.rx.recv().unwrap() {
                 Message::Run(cmd) => {
                     cx.config.shell().verbose(|c| c.status("Running", &cmd))?;
                 }
@@ -176,9 +220,13 @@ impl<'a> JobQueue<'a> {
                         writeln!(cx.config.shell().err(), "{}", err)?;
                     }
                 }
-                Message::Finish(result) => {
+                Message::Finish(key, result) => {
                     info!("end: {:?}", key);
                     self.active -= 1;
+                    if self.active > 0 {
+                        assert!(tokens.len() > 0);
+                        drop(tokens.pop());
+                    }
                     match result {
                         Ok(()) => self.finish(key, cx)?,
                         Err(e) => {
@@ -198,6 +246,11 @@ impl<'a> JobQueue<'a> {
                         }
                     }
                 }
+                Message::Token(acquired_token) => {
+                    tokens.push(acquired_token.chain_err(|| {
+                        "failed to acquire jobserver token"
+                    })?);
+                }
             }
         }
 
@@ -244,9 +297,8 @@ impl<'a> JobQueue<'a> {
         scope.spawn(move || {
             let res = job.run(fresh, &JobState {
                 tx: my_tx.clone(),
-                key: key,
             });
-            my_tx.send((key, Message::Finish(res))).unwrap();
+            my_tx.send(Message::Finish(key, res)).unwrap();
         });
 
         // Print out some nice progress information
index c2dd2a80656bf88c23fd166b1439a6e6c0726de9..58e069a1376ef79bc8fb71f85be08408fea90e5d 100644 (file)
@@ -66,6 +66,7 @@ pub type PackagesToBuild<'a> = [(&'a Package, Vec<(&'a Target, &'a Profile)>)];
 /// the build calls.
 pub trait Executor: Send + Sync + 'static {
     fn init(&self, _cx: &Context) {}
+
     /// If execution succeeds, the ContinueBuild value indicates whether Cargo
     /// should continue with the build process for this package.
     fn exec(&self, cmd: ProcessBuilder, _id: &PackageId) -> CargoResult<()> {
@@ -575,6 +576,7 @@ fn prepare_rustc(cx: &mut Context,
                  crate_types: Vec<&str>,
                  unit: &Unit) -> CargoResult<ProcessBuilder> {
     let mut base = cx.compilation.rustc_process(unit.pkg)?;
+    base.inherit_jobserver(&cx.jobserver);
     build_base_args(cx, &mut base, unit, &crate_types);
     build_deps_args(&mut base, cx, unit)?;
     Ok(base)
@@ -583,6 +585,7 @@ fn prepare_rustc(cx: &mut Context,
 
 fn rustdoc(cx: &mut Context, unit: &Unit) -> CargoResult<Work> {
     let mut rustdoc = cx.compilation.rustdoc_process(unit.pkg)?;
+    rustdoc.inherit_jobserver(&cx.jobserver);
     rustdoc.arg("--crate-name").arg(&unit.target.crate_name())
            .cwd(cx.config.cwd())
            .arg(&root_path(cx, unit));
index afaacae0674ef4fca5c9370b65e7c9912b2acc19..a8bfb2eed3e2ba5f783a817e4634dc19f58d29ab 100644 (file)
@@ -1,7 +1,7 @@
 use std::cell::{RefCell, RefMut, Cell};
+use std::collections::HashSet;
 use std::collections::hash_map::Entry::{Occupied, Vacant};
 use std::collections::hash_map::HashMap;
-use std::collections::HashSet;
 use std::env;
 use std::fmt;
 use std::fs::{self, File};
@@ -10,15 +10,17 @@ use std::io::prelude::*;
 use std::mem;
 use std::path::{Path, PathBuf};
 use std::str::FromStr;
+use std::sync::{Once, ONCE_INIT};
 
+use core::MultiShell;
+use core::shell::{Verbosity, ColorConfig};
+use jobserver;
 use rustc_serialize::{Encodable,Encoder};
 use toml;
-use core::shell::{Verbosity, ColorConfig};
-use core::MultiShell;
 use util::Rustc;
 use util::errors::{CargoResult, CargoResultExt, CargoError, internal};
-use util::{Filesystem, LazyCell};
 use util::paths;
+use util::{Filesystem, LazyCell};
 
 use util::toml as cargo_toml;
 
@@ -35,12 +37,24 @@ pub struct Config {
     extra_verbose: Cell<bool>,
     frozen: Cell<bool>,
     locked: Cell<bool>,
+    jobserver: Option<jobserver::Client>,
 }
 
 impl Config {
     pub fn new(shell: MultiShell,
                cwd: PathBuf,
                homedir: PathBuf) -> Config {
+        static mut GLOBAL_JOBSERVER: *mut jobserver::Client = 0 as *mut _;
+        static INIT: Once = ONCE_INIT;
+
+        // This should be called early on in the process, so in theory the
+        // unsafety is ok here. (taken ownership of random fds)
+        INIT.call_once(|| unsafe {
+            if let Some(client) = jobserver::Client::from_env() {
+                GLOBAL_JOBSERVER = Box::into_raw(Box::new(client));
+            }
+        });
+
         Config {
             home_path: Filesystem::new(homedir),
             shell: RefCell::new(shell),
@@ -52,6 +66,13 @@ impl Config {
             extra_verbose: Cell::new(false),
             frozen: Cell::new(false),
             locked: Cell::new(false),
+            jobserver: unsafe {
+                if GLOBAL_JOBSERVER.is_null() {
+                    None
+                } else {
+                    Some((*GLOBAL_JOBSERVER).clone())
+                }
+            },
         }
     }
 
@@ -458,6 +479,10 @@ impl Config {
         self.maybe_get_tool(tool)
             .map(|t| t.unwrap_or(PathBuf::from(tool)))
     }
+
+    pub fn jobserver_from_env(&self) -> Option<&jobserver::Client> {
+        self.jobserver.as_ref()
+    }
 }
 
 #[derive(Eq, PartialEq, Clone, Copy)]
index 819771981eeb93efc4c6b17304903d5f55490bd5..5f8021d11bb30a66bdcabb9288a61d58e17c33bd 100644 (file)
@@ -5,16 +5,19 @@ use std::fmt;
 use std::path::Path;
 use std::process::{Command, Stdio, Output};
 
+use jobserver::Client;
+use shell_escape::escape;
+
 use util::{CargoResult, CargoResultExt, CargoError, process_error, read2};
 use util::errors::CargoErrorKind;
-use shell_escape::escape;
 
-#[derive(Clone, PartialEq, Debug)]
+#[derive(Clone, Debug)]
 pub struct ProcessBuilder {
     program: OsString,
     args: Vec<OsString>,
     env: HashMap<String, Option<OsString>>,
     cwd: Option<OsString>,
+    jobserver: Option<Client>,
 }
 
 impl fmt::Display for ProcessBuilder {
@@ -73,6 +76,11 @@ impl ProcessBuilder {
 
     pub fn get_envs(&self) -> &HashMap<String, Option<OsString>> { &self.env }
 
+    pub fn inherit_jobserver(&mut self, jobserver: &Client) -> &mut Self {
+        self.jobserver = Some(jobserver.clone());
+        self
+    }
+
     pub fn exec(&self) -> CargoResult<()> {
         let mut command = self.build_command();
         let exit = command.status().chain_err(|| {
@@ -207,6 +215,9 @@ impl ProcessBuilder {
                 None => { command.env_remove(k); }
             }
         }
+        if let Some(ref c) = self.jobserver {
+            c.configure(&mut command);
+        }
         command
     }
 
@@ -226,5 +237,6 @@ pub fn process<T: AsRef<OsStr>>(cmd: T) -> ProcessBuilder {
         args: Vec::new(),
         cwd: None,
         env: HashMap::new(),
+        jobserver: None,
     }
 }
index 77bfa71052e47cf8b2ddce9f66710f0ae4d57775..276bb903ca812f2013c9ad9e30e1d7b34c2624ee 100644 (file)
@@ -88,7 +88,14 @@ let out_dir = env::var("OUT_DIR").unwrap();
              triples can be found in [clang’s own documentation][clang].
 * `HOST` - the host triple of the rust compiler.
 * `NUM_JOBS` - the parallelism specified as the top-level parallelism. This can
-               be useful to pass a `-j` parameter to a system like `make`.
+               be useful to pass a `-j` parameter to a system like `make`. Note
+               that care should be taken when interpreting this environment
+               variable. For historical purposes this is still provided but
+               recent versions of Cargo, for example, do not need to run `make
+               -j` as it'll automatically happen. Cargo implements its own
+               [jobserver] and will allow build scripts to inherit this
+               information, so programs compatible with GNU make jobservers will
+               already have appropriately configured parallelism.
 * `OPT_LEVEL`, `DEBUG` - values of the corresponding variables for the
                          profile currently being built.
 * `PROFILE` - `release` for release builds, `debug` for other builds.
@@ -101,6 +108,7 @@ let out_dir = env::var("OUT_DIR").unwrap();
 [links]: build-script.html#the-links-manifest-key
 [profile]: manifest.html#the-profile-sections
 [clang]:http://clang.llvm.org/docs/CrossCompilation.html#target-triple
+[jobserver]: http://make.mad-scientist.net/papers/jobserver-implementation/
 
 # Environment variables Cargo sets for 3rd party subcommands
 
index e8497285df13f1f5d5b6bcdd99a60777fe73eb05..33e58c0e2e5262528f31deb9a8da6a4d639c8608 100644 (file)
@@ -60,6 +60,8 @@ fn _process(t: &OsStr) -> cargo::util::ProcessBuilder {
      .env_remove("XDG_CONFIG_HOME")      // see #2345
      .env("GIT_CONFIG_NOSYSTEM", "1")    // keep trying to sandbox ourselves
      .env_remove("EMAIL")
+     .env_remove("MFLAGS")
+     .env_remove("MAKEFLAGS")
      .env_remove("GIT_AUTHOR_NAME")
      .env_remove("GIT_AUTHOR_EMAIL")
      .env_remove("GIT_COMMITTER_NAME")
diff --git a/tests/jobserver.rs b/tests/jobserver.rs
new file mode 100644 (file)
index 0000000..7d055de
--- /dev/null
@@ -0,0 +1,184 @@
+extern crate cargotest;
+extern crate hamcrest;
+extern crate cargo;
+
+use std::str;
+use std::net::TcpListener;
+use std::thread;
+use std::process::Command;
+
+use cargotest::support::{project, execs, cargo_exe};
+use hamcrest::assert_that;
+
+#[test]
+fn jobserver_exists() {
+    let p = project("foo")
+        .file("Cargo.toml", r#"
+            [package]
+            name = "foo"
+            version = "0.0.1"
+            authors = []
+        "#)
+        .file("build.rs", r#"
+            use std::env;
+
+            fn main() {
+                let var = env::var("MAKEFLAGS").unwrap();
+                let arg = var.split(' ')
+                             .find(|p| p.starts_with("--jobserver"))
+                             .unwrap();
+                let val = &arg[arg.find('=').unwrap() + 1..];
+                validate(val);
+            }
+
+            #[cfg(unix)]
+            fn validate(s: &str) {
+                use std::fs::File;
+                use std::io::*;
+                use std::os::unix::prelude::*;
+
+                let fds = s.split(',').collect::<Vec<_>>();
+                println!("{}", s);
+                assert_eq!(fds.len(), 2);
+                unsafe {
+                    let mut read = File::from_raw_fd(fds[0].parse().unwrap());
+                    let mut write = File::from_raw_fd(fds[1].parse().unwrap());
+
+                    let mut buf = [0];
+                    assert_eq!(read.read(&mut buf).unwrap(), 1);
+                    assert_eq!(write.write(&buf).unwrap(), 1);
+                }
+            }
+
+            #[cfg(windows)]
+            fn validate(_: &str) {
+                // a little too complicated for a test...
+            }
+        "#)
+        .file("src/lib.rs", "");
+
+    assert_that(p.cargo_process("build"),
+                execs().with_status(0));
+}
+
+#[test]
+fn makes_jobserver_used() {
+    let make = if cfg!(windows) {"mingw32-make"} else {"make"};
+    if Command::new(make).arg("--version").output().is_err() {
+        return
+    }
+
+    let p = project("foo")
+        .file("Cargo.toml", r#"
+            [package]
+            name = "foo"
+            version = "0.0.1"
+            authors = []
+
+            [dependencies]
+            d1 = { path = "d1" }
+            d2 = { path = "d2" }
+            d3 = { path = "d3" }
+        "#)
+       .file("src/lib.rs", "")
+       .file("d1/Cargo.toml", r#"
+            [package]
+            name = "d1"
+            version = "0.0.1"
+            authors = []
+            build = "../dbuild.rs"
+        "#)
+       .file("d1/src/lib.rs", "")
+       .file("d2/Cargo.toml", r#"
+            [package]
+            name = "d2"
+            version = "0.0.1"
+            authors = []
+            build = "../dbuild.rs"
+        "#)
+       .file("d2/src/lib.rs", "")
+       .file("d3/Cargo.toml", r#"
+            [package]
+            name = "d3"
+            version = "0.0.1"
+            authors = []
+            build = "../dbuild.rs"
+        "#)
+       .file("d3/src/lib.rs", "")
+       .file("dbuild.rs", r#"
+            use std::net::TcpStream;
+            use std::env;
+            use std::io::Read;
+
+            fn main() {
+                let addr = env::var("ADDR").unwrap();
+                let mut stream = TcpStream::connect(addr).unwrap();
+                let mut v = Vec::new();
+                stream.read_to_end(&mut v).unwrap();
+            }
+       "#)
+       .file("Makefile", "\
+all:
+\t+$(CARGO) build
+");
+    p.build();
+
+    let l = TcpListener::bind("127.0.0.1:0").unwrap();
+    let addr = l.local_addr().unwrap();
+
+    let child = thread::spawn(move || {
+        let a1 = l.accept().unwrap();
+        let a2 = l.accept().unwrap();
+        l.set_nonblocking(true).unwrap();
+
+        for _ in 0..1000 {
+            assert!(l.accept().is_err());
+            thread::yield_now();
+        }
+
+        drop(a1);
+        l.set_nonblocking(false).unwrap();
+        let a3 = l.accept().unwrap();
+
+        drop((a2, a3));
+    });
+
+    assert_that(p.process(make)
+                 .env("CARGO", cargo_exe())
+                 .env("ADDR", addr.to_string())
+                 .arg("-j2"),
+                execs().with_status(0));
+    child.join().unwrap();
+}
+
+#[test]
+fn jobserver_and_j() {
+    let make = if cfg!(windows) {"mingw32-make"} else {"make"};
+    if Command::new(make).arg("--version").output().is_err() {
+        return
+    }
+
+    let p = project("foo")
+        .file("Cargo.toml", r#"
+            [package]
+            name = "foo"
+            version = "0.0.1"
+            authors = []
+        "#)
+       .file("src/lib.rs", "")
+       .file("Makefile", "\
+all:
+\t+$(CARGO) build -j2
+");
+    p.build();
+
+    assert_that(p.process(make)
+                 .env("CARGO", cargo_exe())
+                 .arg("-j2"),
+                execs().with_status(0).with_stderr("\
+warning: a `-j` argument was passed to Cargo but Cargo is also configured \
+with an external jobserver in its environment, ignoring the `-j` parameter
+[COMPILING] [..]
+[FINISHED] [..]
+"));
+}